/*** xrkmonitor license ***

   Copyright (c) 2019 by rockdeng

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.


   字符云监控(xrkmonitor) 开源版 (c) 2019 by rockdeng
   当前版本：v1.0
   使用授权协议： apache license 2.0

   云版本主页：http://xrkmonitor.com

   云版本为开源版提供永久免费告警通道支持，告警通道支持短信、邮件、
   微信等多种方式，欢迎使用

   模块 slog_mtreport_client 功能:
        用于上报除监控系统本身产生的监控点数据、日志，为减少部署上的依赖
		未引入任何第三方组件

****/

#include <string.h>
#include <time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>

#include "sv_socket.h"
#include "mtreport_client.h"

struct MtReportSocket g_socket;

#define SOCKET_BUFF_LEN 8192

int MySocket(int flag)
{
    if(g_socket.iRemoteProtoc == REMOTE_PROTOC_TCP)
        return socket(AF_INET, SOCK_STREAM, flag);
    return socket(AF_INET, SOCK_DGRAM, flag);
}

int InitSocket(OnTimeout_f onTimeout, OnError_f onError, int iProtoc)
{
	if(g_socket.cIsInit)
		return 1;
    if(iProtoc != REMOTE_PROTOC_TCP && iProtoc != REMOTE_PROTOC_UDP)
        return -1;
    g_socket.iRemoteProtoc = iProtoc;
	g_socket.onTimeout = onTimeout;
	g_socket.onError = onError;
	g_socket.cIsInit = 1;
	return 0;
}

int AddSocket(int isock, struct sockaddr_in *paddr, OnPkg_f onPkg, char *priv, int iPrivLen)
{
	if(!g_socket.cIsInit)
		return -1;
	if(g_socket.iNumSock >= MT_SOCKET_DEF_COUNT_MAX)
		return -2;

	int i=0, j=g_socket.iLastFreeIdx;
	for(i=0; i < MT_SOCKET_DEF_COUNT_MAX; i++) {
		if(g_socket.socks[j].isock == 0)
			break;
		else {
			j++;
			if(j >= MT_SOCKET_DEF_COUNT_MAX)
				j=0;
		}
	}
	if(i >= MT_SOCKET_DEF_COUNT_MAX)
		return -3;
    if(priv && iPrivLen > (int)sizeof(g_socket.socks[0].sPrivBuf))
        return -4;

    g_socket.socks[j].Reset();
	g_socket.socks[j].isock = isock;

    if(REMOTE_PROTOC_UDP == g_socket.iRemoteProtoc) {
        g_socket.socks[j].buf = new char[SOCKET_BUFF_LEN];
        if(!g_socket.socks[j].buf)
            return -5;
    }
    else {
        g_socket.socks[j].pcycbuf = new CircularBuffer(SOCKET_BUFF_LEN);
        if(!g_socket.socks[j].pcycbuf)
            return -5;
    }

	if(paddr != NULL)
		memcpy(&(g_socket.socks[j].remote), paddr, MYSIZEOF(struct sockaddr_in));
	else 
		memset(&(g_socket.socks[j].remote), 0, MYSIZEOF(struct sockaddr_in));
	memset(&(g_socket.socks[j].last_recv_remote), 0, MYSIZEOF(struct sockaddr_in));
	g_socket.socks[j].onPkg = onPkg;
	g_socket.socks[j].iMaxRespTimeMs = -1;

    if(priv && iPrivLen > 0) {
        memcpy(g_socket.socks[j].sPrivBuf, priv, iPrivLen);
        g_socket.socks[j].iPrivLen = iPrivLen;
    }

	if(j+1 >= MT_SOCKET_DEF_COUNT_MAX)
		g_socket.iLastFreeIdx = 0;
	else
		g_socket.iLastFreeIdx = j+1;
	if(isock+1 > g_socket.iSockMax)
		g_socket.iSockMax = isock+1;
	FD_SET(isock, &(g_socket.socks_save));
	g_socket.iNumSock++;
	return j;
}

int CheckSocket(time_t tmcur, uint32_t usSelet)
{
	int ret = 0, i = 0, j = 0;

	// check socket recv
	memcpy(&g_socket.socks_use, &g_socket.socks_save, MYSIZEOF(fd_set));
	struct timeval stTimeVal;
	stTimeVal.tv_sec = usSelet/1000000;
	stTimeVal.tv_usec = usSelet%1000000;
	ret = select(g_socket.iSockMax, &g_socket.socks_use, NULL, NULL, &stTimeVal);
	if(ret < 0) {
		if(g_socket.onError)
			g_socket.onError(NULL, tmcur);
	}
	else if (ret==0) {
		if(g_socket.onTimeout)
			g_socket.onTimeout(tmcur);
	}
	else {
		socklen_t iAddrLen = 0;
		for(j=ret, i=0; i < MT_SOCKET_DEF_COUNT_MAX && j > 0; i++)
		{
			if(g_socket.socks[i].isock != 0 && FD_ISSET(g_socket.socks[i].isock, &g_socket.socks_use)) {
				j--;

				iAddrLen = MYSIZEOF(struct sockaddr_in);
				int len = recvfrom(g_socket.socks[i].isock, g_socket.socks[i].buf, SOCKET_BUFF_LEN,
					0, (struct sockaddr *)&(g_socket.socks[i].last_recv_remote), &iAddrLen);
				if(len < 0) {
					if(g_socket.onError)
						g_socket.onError(g_socket.socks+i, tmcur);
				}
				else if(g_socket.socks[i].onPkg) {
					g_socket.socks[i].onPkg(g_socket.socks+i, g_socket.socks[i].buf, len, tmcur);
				}
			
			}
		}
	}
	return ret;
}

void ModMaxResponseTime(int iSockIdx, int iRespTime)
{
	if(iSockIdx < 0 || iSockIdx >= MT_SOCKET_DEF_COUNT_MAX)
		return;

	if(g_socket.socks[iSockIdx].isock <= 0)
		return;
	if(iRespTime > g_socket.socks[iSockIdx].iMaxRespTimeMs) {
		g_socket.socks[iSockIdx].iMaxRespTimeMs = iRespTime;
		INFO_LOG("modify server:%s:%d max response time:%d",
			inet_ntoa(g_socket.socks[iSockIdx].remote.sin_addr), 
			ntohs(g_socket.socks[iSockIdx].remote.sin_port), iRespTime);
	}
}

int32_t GetMaxResponseTime(int iSockIdx)
{
	if(iSockIdx < 0 || iSockIdx >= MT_SOCKET_DEF_COUNT_MAX)
		return PKG_TIMEOUT_MS;

	if(g_socket.socks[iSockIdx].isock <= 0)
		return PKG_TIMEOUT_MS;
	if(g_socket.socks[iSockIdx].iMaxRespTimeMs <= 0)
		return PKG_TIMEOUT_MS-1000;
	return g_socket.socks[iSockIdx].iMaxRespTimeMs;
}

uint32_t GetSocketAddress(int iSockIdx)
{
	if(iSockIdx < 0 || iSockIdx >= MT_SOCKET_DEF_COUNT_MAX)
		return 0;

	if(g_socket.socks[iSockIdx].isock <= 0)
		return 0;
	return g_socket.socks[iSockIdx].remote.sin_addr.s_addr;
}

void SetSocketAddress(int iSockIdx, uint32_t dwNetAddress, uint16_t wPort)
{
	if(iSockIdx < 0 || iSockIdx >= MT_SOCKET_DEF_COUNT_MAX)
		return ;

	if(g_socket.socks[iSockIdx].isock <= 0)
		return ;
	if(g_socket.socks[iSockIdx].remote.sin_addr.s_addr != dwNetAddress)
		g_socket.socks[iSockIdx].iMaxRespTimeMs = -1;
	g_socket.socks[iSockIdx].remote.sin_addr.s_addr = dwNetAddress;
	if(wPort != 0)
		g_socket.socks[iSockIdx].remote.sin_port = htons(wPort);
}

int SendAckPacket(struct MtSocket *psock, const char *pdata, int iDataLen)
{
	if(psock->isock <= 0)
		return -2;

    socklen_t socklen = sizeof(struct sockaddr_in);
	int ret = sendto(psock->isock,
		pdata, iDataLen, 0, (struct sockaddr *)&(psock->last_recv_remote), socklen);
	if(ret != iDataLen && g_socket.onError)
		g_socket.onError(psock, time(NULL));
	return ret;
}

int DealUdpPkgHeadAutoRetry(ReqPkgHead *pHead, struct sockaddr_in *pDestAddr)
{
    pHead->cUdpEnableAutoRetry = 1;
    int iPackIdx = -1;
    uint32_t dwCmd = ntohl(pHead->dwCmd);
    int iRet = g_socket.pack.GetPackInfoIndex(dwCmd, pDestAddr, &iPackIdx);
    if(iRet < 0) {
        if(iPackIdx < 0) {
            ERROR_LOG("need more udp packet info space !");
            return -1;
        }
        else {
            g_socket.pack.InitPackInfo(dwCmd, pDestAddr, iPackIdx);
            INFO_LOG("init udp auto retry info index:%d - cmd:%u, server:%s:%d", iPackIdx,
                dwCmd, inet_ntoa(pDestAddr->sin_addr), ntohs(pDestAddr->sin_port));
        }
    }

    if(iPackIdx >= 0) {
        if(pHead->bResendTimes > 0) {
            if(g_socket.pack.StartUdpRetry(iRet, true))
                INFO_LOG("restart udp auto retry - cmd:%d, server:%s:%d", 
                    dwCmd, inet_ntoa(pDestAddr->sin_addr), ntohs(pDestAddr->sin_port));
        }
        else {
            if(g_socket.pack.NeedRetry(pHead, iPackIdx))
                pHead->cNeedAck = 1;
            else
                pHead->cNeedAck = 0;
            pHead->iSendPacks = htonl(g_socket.pack.GetSendPacks(iPackIdx));
        }
    }
    return iPackIdx;
}

int SendPacket(int iSockIdx, struct sockaddr_in *pDestAddr, const char *pdata, int iDataLen, bool bEnableUdpAutoRetry)
{
	if(iSockIdx < 0 || iSockIdx >= MT_SOCKET_DEF_COUNT_MAX || !pdata || iDataLen <= 0)
		return -1;

	if(g_socket.socks[iSockIdx].isock <= 0)
		return -2;

	int ret = 0;
	struct sockaddr_in remote_addr;
	socklen_t socklen = MYSIZEOF(struct sockaddr_in);

	if(pDestAddr != NULL 
		&& memcmp(pDestAddr, &g_socket.socks[iSockIdx].remote, MYSIZEOF(struct sockaddr_in)))
	{
		memcpy(&g_socket.socks[iSockIdx].remote, pDestAddr, MYSIZEOF(struct sockaddr_in));
		DEBUG_LOG("update socket:%d remote address to:%s", 
			iSockIdx, inet_ntoa(g_socket.socks[iSockIdx].remote.sin_addr));
	}

    // 在该函数中集中修改头部中某些字段
    ReqPkgHead *pHead = (ReqPkgHead*)(pdata+1);
    int iPackIdx = -1;
    if(bEnableUdpAutoRetry && stConfig.iUdpAutoRetry) 
        iPackIdx = DealUdpPkgHeadAutoRetry(pHead, &g_socket.socks[iSockIdx].remote);
    memcpy(&remote_addr, &g_socket.socks[iSockIdx].remote, MYSIZEOF(remote_addr));

    DEBUG_LOG("try send udp packet, sock index:%d, remote:%s:%d", iSockIdx,
        inet_ntoa(g_socket.socks[iSockIdx].remote.sin_addr), ntohs(g_socket.socks[iSockIdx].remote.sin_port));
    ret = sendto(g_socket.socks[iSockIdx].isock, pdata, iDataLen, 0, (struct sockaddr *)&remote_addr, socklen);
	if(ret != iDataLen && g_socket.onError) {
		ERROR_LOG("sendto failed, packet len:%d to %s:%d, ret:%d, msg:%s", iDataLen, 
			inet_ntoa(g_socket.socks[iSockIdx].remote.sin_addr),
			ntohs(g_socket.socks[iSockIdx].remote.sin_port), ret, strerror(errno));
		g_socket.onError(g_socket.socks+iSockIdx, time(NULL));
        return -5;
	}
    if(bEnableUdpAutoRetry && iPackIdx >= 0)
        g_socket.pack.AddSendPacks(iPackIdx);
	return iDataLen;
}

void InitSocketComm(int iSock)
{
	int iFlags = 0; 
	iFlags = fcntl(iSock, F_GETFL, 0);
	iFlags |= O_NONBLOCK;
	iFlags |= O_NDELAY; 
	fcntl(iSock, F_SETFL, iFlags); 

	int iReuseAddr = 1;
	if(setsockopt(iSock, SOL_SOCKET, SO_REUSEADDR, &iReuseAddr, MYSIZEOF(iReuseAddr) != 0)) {
	}
	int iSockBufLen = 512*1024;
	socklen_t optlen = MYSIZEOF(iSockBufLen);
	setsockopt(iSock, SOL_SOCKET, SO_RCVBUF, &iSockBufLen, optlen);
	setsockopt(iSock, SOL_SOCKET, SO_SNDBUF, &iSockBufLen, optlen);
}

void CPacketInfo::InitPackInfo(uint32_t cmd, struct sockaddr_in *pDestAddr, int iPackIdx)
{
    if(iPackIdx < 0 || iPackIdx >= MT_SOCKET_DEF_COUNT_MAX) {
        ERROR_LOG("invalid index:%d", iPackIdx);
        return;
    }

    m_pkInfo[iPackIdx].Reset();
    m_pkInfo[iPackIdx].dwLastAddTime = stConfig.dwCurTime;
    m_pkInfo[iPackIdx].dwRemoteAddr = pDestAddr->sin_addr.s_addr;
    m_pkInfo[iPackIdx].wRemotePort = pDestAddr->sin_port;
    m_pkInfo[iPackIdx].dwReqCmd = cmd;

    // client/server 重新开始统计包量
    m_pkInfo[iPackIdx].iSendPacks = -1;
}

void CPacketInfo::OutPackInfo()
{
    for(int i=0; i < MT_SOCKET_DEF_COUNT_MAX; i++) {
        if(!m_pkInfo[i].IsFree()) {
            DEBUG_LOG("-------- packet info:%d ----------------------", i);
            _PackInfo *pstr = m_pkInfo+i;
            LOG_SHOW_FIELD_VALUE_UINT_TIME(dwLastAddTime);
            LOG_SHOW_FIELD_VALUE_UINT_IP(dwRemoteAddr);
            DEBUG_LOG("\twRemotePort:%d", ntohs(pstr->wRemotePort));
            LOG_SHOW_FIELD_VALUE_UINT(dwReqCmd);
            LOG_SHOW_FIELD_VALUE_UINT_TIME(dwRetryStartTime);
            LOG_SHOW_FIELD_VALUE_INT(iSendPacks);
            LOG_SHOW_FIELD_VALUE_INT(iRecvAckCur);
            DEBUG_LOG("\n");
        }
    }
}

int CPacketInfo::GetPackInfoIndex(uint32_t cmd, struct sockaddr_in *pDestAddr, int *pNewIdx)
{
    int i = 0;
    if(pNewIdx)
        *pNewIdx = -1;
    for(i=0; i < MT_SOCKET_DEF_COUNT_MAX; i++) {
        if(m_pkInfo[i].IsEqual(pDestAddr->sin_addr.s_addr, pDestAddr->sin_port, cmd)) {
            if(pNewIdx)
                *pNewIdx = i;
            return i;
        }
        else if(pNewIdx && *pNewIdx < 0 && m_pkInfo[i].IsFree())
            *pNewIdx = i;
    }
    return -1;
}

bool CPacketInfo::NeedRetry(ReqPkgHead *pHead, int iPackIdx)
{
    if(iPackIdx < 0 || iPackIdx >= MT_SOCKET_DEF_COUNT_MAX) {
        ERROR_LOG("invalid index:%d", iPackIdx);
        return true;
    }

    // 从未收到过 server 回包
    if(!m_pkInfo[iPackIdx].bServerHasAck)
        return true;

    // 已自动启用可靠udp，时间和包量检测仍在启用中
    if(m_pkInfo[iPackIdx].dwRetryStartTime+UDP_RETRY_TIME_SECONDS >= stConfig.dwCurTime
        || (m_pkInfo[iPackIdx].dwRetryStartTime > 0 && m_pkInfo[iPackIdx].iSendPacks < UDP_RETRY_PACKS))
    {
        return true;
    }

    // 重新进入无需回包状态
    if(m_pkInfo[iPackIdx].dwRetryStartTime > 0) {
        m_pkInfo[iPackIdx].iSendPacks = -1;
        m_pkInfo[iPackIdx].dwRetryStartTime = 0;
    }
    return false;
}


bool CPacketInfo::StartUdpRetry(int iPackIdx, bool bForce)
{
    if(iPackIdx < 0 || iPackIdx >= MT_SOCKET_DEF_COUNT_MAX) {
        ERROR_LOG("invalid index:%d", iPackIdx);
        return false;
    }

    // 出现重发包，强制重新开始统计
    if(bForce) {
        m_pkInfo[iPackIdx].dwRetryStartTime = stConfig.dwCurTime;
        m_pkInfo[iPackIdx].iSendPacks = -1;
        return true;
    }

    if(m_pkInfo[iPackIdx].dwRetryStartTime+UDP_RETRY_TIME_SECONDS >= stConfig.dwCurTime
        || (m_pkInfo[iPackIdx].dwRetryStartTime > 0 && m_pkInfo[iPackIdx].iSendPacks < UDP_RETRY_PACKS))
        return false;

    // 开启自动可靠udp，包量重新开始统计(接下来一分钟且连续50个数据包无重发才能改变状态)
    m_pkInfo[iPackIdx].dwRetryStartTime = stConfig.dwCurTime;
    m_pkInfo[iPackIdx].iSendPacks = -1;
    return true;
}

void CPacketInfo::AddRecvAck(int iPackIdx) 
{
    if(iPackIdx < 0 || iPackIdx >= MT_SOCKET_DEF_COUNT_MAX)
        return;
    if(m_pkInfo[iPackIdx].iRecvAckCur < 0)
        m_pkInfo[iPackIdx].iRecvAckCur = 1;
    else
        m_pkInfo[iPackIdx].iRecvAckCur++;
    m_pkInfo[iPackIdx].bServerHasAck = true;
}

void CPacketInfo::AddSendPacks(int iPackIdx) 
{
    if(iPackIdx < 0 || iPackIdx >= MT_SOCKET_DEF_COUNT_MAX)
        return;

    // iSendPacks < 0 表示溢出了，client/server 重新统计包量
    if(m_pkInfo[iPackIdx].iSendPacks < 0)
        m_pkInfo[iPackIdx].iSendPacks = 1;
    else
        m_pkInfo[iPackIdx].iSendPacks++;
    m_pkInfo[iPackIdx].dwLastAddTime = stConfig.dwCurTime;
}

bool CPacketInfo::_PackInfo::IsFree()
{
    // 超时1小时未使用回收节点
    return (dwLastAddTime+3600 < stConfig.dwCurTime || dwRemoteAddr==0);
}

CircularBuffer::CircularBuffer(size_t size)
:buf(new char[2 * size])
,m_max(size)
,m_q(0)
,m_b(0)
,m_t(0)
,m_count(0)
{
}

CircularBuffer::~CircularBuffer()
{
    delete[] buf; 
}


bool CircularBuffer::Write(const char *s,size_t l)
{
    if (m_q + l > m_max)
    {
        return false;
    }
    m_count += (unsigned long)l;
    if (m_t + l > m_max)
    {
        size_t l1 = m_max - m_t; 
        memcpy(buf + m_t, s, l);
        memcpy(buf, s + l1, l - l1);
        m_t = l - l1;
        m_q += l;
    }
    else
    {
        memcpy(buf + m_t, s, l);
        memcpy(buf + m_max + m_t, s, l);
        m_t += l;
        if (m_t >= m_max)
            m_t -= m_max;
        m_q += l;
    }
    return true;
}

bool CircularBuffer::Read(char *s,size_t l)
{
    if (l > m_q)
    {
        return false; 
    }
    if (m_b + l > m_max) 
    {
        size_t l1 = m_max - m_b;
        if (s)
        {
            memcpy(s, buf + m_b, l1);
            memcpy(s + l1, buf, l - l1);
        }
        m_b = l - l1;
        m_q -= l;
    }
    else
    {
        if (s)
        {
            memcpy(s, buf + m_b, l);
        }
        m_b += l;
        if (m_b >= m_max)
            m_b -= m_max;
        m_q -= l;
    }
    if (!m_q)
    {
        m_b = m_t = 0;
    }
    return true;
}

bool CircularBuffer::Remove(size_t l)
{
    return Read(NULL, l);
}

size_t CircularBuffer::GetLength()
{
    return m_q;
}

const char *CircularBuffer::GetStart()
{
    return buf + m_b;
}


size_t CircularBuffer::GetL()
{
    return (m_b + m_q > m_max) ? m_max - m_b : m_q;
}

size_t CircularBuffer::Space()
{
    return m_max - m_q;
}

unsigned long CircularBuffer::ByteCounter(bool clear)
{
    if (clear)
    {
        unsigned long x = m_count;
        m_count = 0;
        return x;
    }
    return m_count;
}

std::string CircularBuffer::ReadString(size_t l)
{
    char *sz = new char[l + 1];
    if (!Read(sz, l)) 
    {
        delete[] sz;
        return "";
    }
    sz[l] = 0;
    std::string tmp = sz;
    delete[] sz;
    return tmp;
}




